package cn.com.bluemoon.bd.flink.utils;

import cn.com.bluemoon.bd.flink.common.Constants;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;

import java.util.concurrent.atomic.AtomicLong;

public class DataUtils {
    private static AtomicLong COUNTER = new AtomicLong(0);

    public static String createCanalMsgId(String line) throws Exception {
        JSONObject jsonObj = JSON.parseObject(line);
        return createCanalMsgId(jsonObj);
    }

    public static String createCanalMsgId(JSONObject jsonObj) throws Exception {
        String msgId = jsonObj.getString("canal_msg_id");

        if (StringUtils.isBlank(msgId)) {
            String canalBinLog = jsonObj.getString("canal_bin_log");
            String binLogOffset = canalBinLog.replace("mysql-bin.", "");
            String sysCanalId = jsonObj.getString("sys_canal_id");
            msgId = binLogOffset + Constants.DELIMITER + sysCanalId;
        }

        return msgId;
    }

    public static long timestamp() {
        return System.currentTimeMillis();
    }

    public static void main(String[] args) {
        System.out.println("mysql-bin.008870_185606043".replace("mysql-bin.", ""));
    }
}
